package com.faner4cloud.yun.common.util.batch;

import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * 同步分批执行
 *
 * @param <T> 数据类型
 */
final class SynchronousBatch<T> implements Batchable<T> {
	/**
	 * 分批大小
	 */
	private int batchSize;
	/**
	 * 数据源
	 */
	private T[] data;

	SynchronousBatch(T[] data, int batchSize) {
		this.data = data;
		this.batchSize = batchSize;
	}

	@Override
	public int sumEach(Function<Batchable.BatchContext<T>, Integer> batchHandler) {
		return doBatch(batchHandler).reduce(0, (result, item) -> result += item);
	}

	@Override
	public <R> List<R> listEach(Function<BatchContext<T>, R> batchHandler) {
		return doBatch(batchHandler).collect(Collectors.toList());
	}

	@Override
	public <R> Stream<R> each(Function<Batchable.BatchContext<T>, R> batchHandler) {
		return doBatch(batchHandler);
	}

	@Override
	public void foreach(Consumer<BatchContext<T>> batchConsumer) {
		int len = this.data.length;
		if (len > 0) {
			int threshold = this.batchSize;
			int batch = 0;
			int fromIndex = 0;
			while (fromIndex < len) {
				// 第几批了
				batch++;
				// 本次可处理多少数据
				int dataCnt = (fromIndex + threshold) > len ? len - fromIndex : threshold;
				// 分批处理
				batchConsumer.accept(new BatchContext<T>(
					Arrays.asList(Arrays.copyOfRange(this.data, fromIndex, fromIndex + dataCnt)), len,
					threshold, batch,
					// fromIndex累加，可用于判断是否是最后一批数据
					(fromIndex = fromIndex + threshold) >= len));
			}
		}
	}

	/**
	 * 批量操作,并返回指定类型的结果流
	 *
	 * @param batchHandler 批处理函数
	 * @return {@code Stream<R>}
	 */
	private <R> Stream<R> doBatch(Function<BatchContext<T>, R> batchHandler) {
		int len = this.data.length;
		if (len > 0) {
			int threshold = this.batchSize;
			int batch = 0;
			int fromIndex = 0;
			Stream.Builder<R> streamBuilder = Stream.builder();
			while (fromIndex < len) {
				// 第几批了
				batch++;
				// 本次可处理多少数据
				int dataCnt = (fromIndex + threshold) > len ? len - fromIndex : threshold;
				// 收集批处理函数返回的数据
				R returnValue = batchHandler.apply(new BatchContext<T>(
					Arrays.asList(Arrays.copyOfRange(this.data, fromIndex, fromIndex + dataCnt)), len,
					threshold, batch,
					// fromIndex累加，可用于判断是否是最后一批数据
					(fromIndex = fromIndex + threshold) >= len));
				// 忽略null值
				if (returnValue == null) {
					continue;
				}
				streamBuilder.add(returnValue);
			}
			return streamBuilder.build();
		}
		return Stream.empty();
	}
}
